1 /*
2  * Collie - An asynchronous event-driven network framework using Dlang development
3  *
4  * Copyright (C) 2015-2017  Shanghai Putao Technology Co., Ltd 
5  *
6  * Developer: putao's Dlang team
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 module collie.codec.http.codec.http1xcodec;
12 import kiss.logger;
13 import collie.codec.http.codec.httpcodec;
14 import collie.codec.http.errocode;
15 import collie.codec.http.headers;
16 import collie.codec.http.httpmessage;
17 import collie.codec.http.httptansaction;
18 import collie.codec.http.parser;
19 import collie.utils.string;
20 import std.array;
21 import std.conv;
22 import std.traits;
23 
24 class HTTP1XCodec : HTTPCodec
25 {
26 	this(TransportDirection direction, uint maxHeaderSize = (64 * 1024))
27 	{
28 		_transportDirection = direction;
29 		_finished = true;
30 		_maxHeaderSize = maxHeaderSize;
31 		_parser.onUrl(&onUrl);
32 		_parser.onMessageBegin(&onMessageBegin);
33 		_parser.onHeaderComplete(&onHeadersComplete);
34 		_parser.onHeaderField(&onHeaderField);
35 		_parser.onHeaderValue(&onHeaderValue);
36 		_parser.onStatus(&onStatus);
37 		_parser.onChunkHeader(&onChunkHeader);
38 		_parser.onChunkComplete(&onChunkComplete);
39 		_parser.onBody(&onBody);
40 		_parser.onMessageComplete(&onMessageComplete);
41 	}
42 
43 	override CodecProtocol getProtocol() {
44 		return CodecProtocol.HTTP_1_X;
45 	}
46 
47 	override TransportDirection getTransportDirection()
48 	{
49 		return _transportDirection;
50 	}
51 
52 	override StreamID createStream() {
53 		return 0;
54 	}
55 
56 	override bool isBusy() {
57 		return !_finished;
58 	}
59 
60 	override bool shouldClose()
61 	{
62 		return !_keepalive;
63 	}
64 
65 	override void setParserPaused(bool paused){}
66 
67 	override void setCallback(CallBack callback) {
68 		_callback = callback;
69 	}
70 
71 	override size_t onIngress(ubyte[] buf)
72 	{
73 		version(CollieDebugMode) logDebug("on Ingress!!");
74 		if(_finished) {
75 			_parser.rest(HTTPParserType.HTTP_BOTH,_maxHeaderSize);
76 		}
77 		auto size = _parser.httpParserExecute(buf);
78 		if(size != buf.length && _parser.isUpgrade == false && _transaction && _callback){
79 				_callback.onError(_transaction,HTTPErrorCode.PROTOCOL_ERROR);
80 		}
81 		return cast(size_t) size;
82 	}
83 
84 	override void onConnectClose()
85 	{
86 		if(_transaction){
87 			_transaction.onErro(HTTPErrorCode.REMOTE_CLOSED);
88 			_transaction.handler = null;
89 			_transaction.transport = null;
90 			_transaction = null;
91 		}
92 	}
93 
94 	override void onTimeOut()
95 	{
96 		if(_transaction){
97 			_transaction.onErro(HTTPErrorCode.TIME_OUT);
98 		}
99 	}
100 
101 	override void detach(HTTPTransaction txn)
102 	{
103 		if(txn is _transaction)
104 			_transaction = null;
105 	}
106 
107 	override size_t generateHeader(
108 		HTTPTransaction txn,
109 		HTTPMessage msg,
110 		HttpWriteBuffer buffer,
111 		bool eom = false)
112 	{
113 		const bool upstream = (_transportDirection == TransportDirection.UPSTREAM);
114 		const size_t beforLen = buffer.length;
115 		auto hversion = msg.getHTTPVersion();
116 		_egressChunked = msg.chunked && !_egressUpgrade;
117 		_lastChunkWritten = false;
118 		bool hasTransferEncodingChunked = false;
119 		bool hasUpgradeHeader = false;
120 		bool hasDateHeader = false;
121 		bool is1xxResponse = false;
122 		bool ingorebody = false;
123 		_keepalive = _keepalive & msg.wantsKeepAlive;
124 		if(!upstream) {
125 			is1xxResponse = msg.is1xxResponse;
126 			appendLiteral(buffer,"HTTP/");
127 			appendLiteral(buffer,to!string(hversion.maj));
128 			appendLiteral(buffer,".");
129 			appendLiteral(buffer,to!string(hversion.min));
130 			appendLiteral(buffer," ");
131 			ushort code = msg.statusCode;
132 			ingorebody = responseBodyMustBeEmpty(code);
133 			appendLiteral(buffer,to!string(code));
134 			appendLiteral(buffer," ");
135 			appendLiteral(buffer,msg.statusMessage);
136 		} else {
137 			appendLiteral(buffer,msg.methodString);
138 			appendLiteral(buffer," ");
139 			appendLiteral(buffer,msg.getPath);
140 			appendLiteral(buffer," HTTP/");
141 			appendLiteral(buffer,to!string(hversion.maj));
142 			appendLiteral(buffer,".");
143 			appendLiteral(buffer,to!string(hversion.min));
144 			_mayChunkEgress = (hversion.maj == 1) && (hversion.min >= 1);
145 		}
146 		appendLiteral(buffer,"\r\n");
147 		_egressChunked &= _mayChunkEgress;
148 		string contLen;
149 		string upgradeHeader;
150 		foreach(HTTPHeaderCode code,string key,string value; msg.getHeaders)
151 		{
152 			if(code == HTTPHeaderCode.CONTENT_LENGTH){
153 				contLen = value;
154 				continue;
155 			} else if (code ==  HTTPHeaderCode.CONNECTION) {
156 				if(isSameIngnoreLowUp(value,"close")) {
157 					_keepalive = false;
158 				}
159 				continue;
160 			} else if(code == HTTPHeaderCode.UPGRADE){
161 				if(upstream) upgradeHeader = value;
162 				hasUpgradeHeader = true;
163 			}  else if (!hasTransferEncodingChunked &&
164 				code == HTTPHeaderCode.TRANSFER_ENCODING) {
165 				if(!isSameIngnoreLowUp(value,"chunked")) 
166 					continue;
167 				hasTransferEncodingChunked = true;
168 				if(!_mayChunkEgress) 
169 					continue;
170 			} 
171 			appendLiteral(buffer,key);
172 			appendLiteral(buffer,": ");
173 			appendLiteral(buffer,value);
174 			appendLiteral(buffer,"\r\n");
175 		}
176 		_inChunk = false;
177 		bool bodyCheck = ((!upstream) && _keepalive && !ingorebody  && !_egressUpgrade) ||
178 				// auto chunk POSTs and any request that came to us chunked
179 				(upstream && ((msg.method == HTTPMethod.HTTP_POST) || _egressChunked));
180 		// TODO: 400 a 1.0 POST with no content-length
181 		// clear egressChunked_ if the header wasn't actually set
182 		_egressChunked &= hasTransferEncodingChunked;
183 		if(bodyCheck && contLen.length == 0 && !_egressChunked){
184 			if (!hasTransferEncodingChunked && _mayChunkEgress) {
185 				appendLiteral(buffer,"Transfer-Encoding: chunked\r\n");
186 				_egressChunked = true;
187 			} else {
188 				_keepalive = false;
189 			}
190 		}
191 		if(!is1xxResponse || upstream || hasUpgradeHeader){
192 			appendLiteral(buffer,"Connection: ");
193 			if(hasUpgradeHeader) {
194 				appendLiteral(buffer,"upgrade\r\n");
195 				_keepalive = true;
196 			} else if(_keepalive)
197 				appendLiteral(buffer,"keep-alive\r\n");
198 			else
199 				appendLiteral(buffer,"close\r\n");
200 		}
201 		appendLiteral(buffer,"Server: Collie\r\n");
202 		if(contLen.length > 0){
203 			appendLiteral(buffer,"Content-Length: ");
204 			appendLiteral(buffer,contLen);
205 			appendLiteral(buffer,"\r\n");
206 		}
207 
208 		appendLiteral(buffer,"\r\n");
209 		return buffer.length - beforLen;
210 	}
211 
212 	override size_t generateBody(HTTPTransaction txn,
213 		HttpWriteBuffer chain,in ubyte[] data,
214 		bool eom)
215 	{
216 		size_t rlen = chain.write(data);
217 		if(_egressChunked && _inChunk) {
218 			appendLiteral(chain,"\r\n");
219 			_inChunk = false;
220 			rlen += 2;
221 		}
222 		if(eom)
223 			rlen += generateEOM(txn,chain);
224 		return rlen;
225 	}
226 
227 	override size_t generateChunkHeader(
228 		HTTPTransaction txn,
229 		HttpWriteBuffer buffer,
230 		size_t length)
231 	{
232 		logDebug("_egressChunked  ", _egressChunked);
233 		if (_egressChunked){
234 			import std.format;
235 			_inChunk = true;
236 			string lent = format("%x\r\n",length);
237 			logDebug("length is : ", length, "  x is: ", lent);
238 			appendLiteral(buffer,lent);
239 			return lent.length;
240 		}
241 		return 0;
242 	}
243 
244 
245 	override size_t generateChunkTerminator(
246 		HTTPTransaction txn,
247 		HttpWriteBuffer buffer)
248 	{
249 		if(_egressChunked && _inChunk)
250 		{
251 			_inChunk = false;
252 			appendLiteral(buffer,"\r\n");
253 			return 2;
254 		}
255 		return 0;
256 	}
257 
258 	override size_t generateEOM(HTTPTransaction txn,
259 		HttpWriteBuffer buffer)
260 	{
261 		size_t rlen = 0;
262 		if(_egressChunked) {
263 			assert(!_inChunk);
264 			if (_headRequest && _transportDirection == TransportDirection.DOWNSTREAM) {
265 				_lastChunkWritten = true;
266 			} else {
267 				// appending a 0\r\n only if it's not a HEAD and downstream request
268 				if (!_lastChunkWritten) {
269 					_lastChunkWritten = true;
270 					//if (!(_headRequest &&
271 					//		transportDirection_ == TransportDirection.DOWNSTREAM)) {
272 					appendLiteral(buffer,"0\r\n");
273 					rlen += 3;
274 					//}
275 				}
276 				appendLiteral(buffer,"\r\n");
277 			}
278 			rlen += 2;
279 		}
280 		switch (_transportDirection) {
281 			case TransportDirection.DOWNSTREAM:
282 				_responsePending = false;
283 				break;
284 			case TransportDirection.UPSTREAM:
285 				_requestPending = false;
286 				break;
287 			default:
288 				break;
289 		}
290 		return rlen;
291 	}
292 
293 	override size_t  generateRstStream(HTTPTransaction txn,
294 		HttpWriteBuffer buffer,HTTPErrorCode code)
295 	{
296 		return 0;
297 	}
298 protected:
299 
300 	final void appendLiteral(T)(HttpWriteBuffer buffer, T[] data) if(isSomeChar!(Unqual!T) || is(Unqual!T == byte) || is(Unqual!T == ubyte))
301 	{
302 		buffer.write(cast(const (ubyte[]))data);
303 	}
304 
305 	void onMessageBegin(ref HTTPParser){
306 		_finished = false;
307 		_headersComplete = false;
308 		_message = new HTTPMessage();
309 		if (_transportDirection == TransportDirection.DOWNSTREAM) {
310 			_requestPending = true;
311 			_responsePending = true;
312 		}
313 		// If there was a 1xx on this connection, don't increment the ingress txn id
314 		if (_transportDirection == TransportDirection.DOWNSTREAM ||
315 			!_is1xxResponse) {
316 		}
317 		if (_transportDirection == TransportDirection.UPSTREAM) {
318 			_is1xxResponse = false;
319 		}
320 		_transaction = new HTTPTransaction(_transportDirection,0,0);
321 		if(_callback)
322 			_callback.onMessageBegin(_transaction, _message);
323 		_currtKey = (ubyte[]).init;
324 		_currtValue = (ubyte[]).init;
325 	}
326 	
327 	void onHeadersComplete(ref HTTPParser parser){
328 		_mayChunkEgress = ((parser.major == 1) && (parser.minor >= 1));
329 		_message.setHTTPVersion(cast(ubyte)parser.major, cast(ubyte)parser.minor);
330 		_egressUpgrade = parser.isUpgrade;
331 		_message.upgraded(parser.isUpgrade);
332 		int klive = parser.keepalive;
333 		version(CollieDebugMode) logDebug("++++++++++klive : ", klive);
334 		switch(klive){
335 			case 1:
336 				_keepalive = true;
337 				break;
338 			case 2:
339 				_keepalive = false;
340 				break;
341 			default :
342 				_keepalive = false;
343 		}
344 		_message.wantsKeepAlive(_keepalive);
345 		_headersComplete = true;
346 		if(_message.upgraded){
347 			string upstring  = _message.getHeaders.getSingleOrEmpty(HTTPHeaderCode.UPGRADE);
348 			CodecProtocol pro = getProtocolFormString(upstring);
349 			if(_callback)
350 				_callback.onNativeProtocolUpgrade(_transaction,pro,upstring,_message);
351 		} else {
352 			if(_callback)
353 				_callback.onHeadersComplete(_transaction,_message);
354 		}
355 	}
356 	
357 	void onMessageComplete(ref HTTPParser parser){
358 		_finished = true;
359 		switch (_transportDirection) {
360 			case TransportDirection.DOWNSTREAM:
361 			{
362 				_requestPending = false;
363 				// else there was no match, OR we upgraded to http/1.1 OR someone specified
364 				// a non-native protocol in the setAllowedUpgradeProtocols.  No-ops
365 				break;
366 			}
367 			case TransportDirection.UPSTREAM:
368 				_responsePending = _is1xxResponse;
369 				break;
370 			default: break;
371 		}
372 		if(_callback)
373 			_callback.onMessageComplete(_transaction,parser.isUpgrade);
374 	}
375 	
376 	void onChunkHeader(ref HTTPParser parser){
377 		if(_callback)
378 			_callback.onChunkHeader(_transaction,cast(size_t)parser.contentLength);
379 	}
380 	
381 	void onChunkComplete(ref HTTPParser parser){
382 		if(_callback)
383 			_callback.onChunkComplete(_transaction);
384 	}
385 	
386 	void onUrl(ref HTTPParser parser, ubyte[] data, bool finish)
387 	{
388 		//logDebug("on Url");
389 		_message.method = parser.methodCode();
390 		_connectRequest = (parser.methodCode() == HTTPMethod.HTTP_CONNECT);
391 		
392 		// If this is a headers-only request, we shouldn't send
393 		// an entity-body in the response.
394 		_headRequest = (parser.methodCode() == HTTPMethod.HTTP_HEAD);
395 
396 		_currtKey ~= data;
397 		if(finish) {
398 			_message.url = cast(string)(_currtKey);
399 			_currtKey = (ubyte[]).init;
400 		}
401 	}
402 	
403 	void onStatus(ref HTTPParser parser, ubyte[] data, bool finish)
404 	{
405 
406 		_currtKey ~= data;
407 		if(finish) {
408 			string sdata = cast(string)_currtKey;
409 			_currtKey = (ubyte[]).init;
410 			_message.statusCode(cast(ushort)parser.statusCode);
411 			_message.statusMessage(sdata);
412 		}
413 	}
414 	
415 	void onHeaderField(ref HTTPParser parser, ubyte[] data, bool finish)
416 	{
417 		//logDebug("on onHeaderField");
418 		_currtKey ~= data;
419 	}
420 	
421 	void onHeaderValue(ref HTTPParser parser, ubyte[] data, bool finish)
422 	{
423 	//	logDebug("on onHeaderField");
424 		_currtValue ~= data;
425 		if(finish){
426 			string key = cast(string)_currtKey;
427 			_currtKey = (ubyte[]).init;
428 			string value = cast(string)_currtValue;
429 			_currtValue  = (ubyte[]).init;
430 			version(CollieDebugMode) logDebug("http header: \t", key, " : ", value);
431 			_message.getHeaders.add(key,value);
432 		}
433 	}
434 	
435 	void onBody(ref HTTPParser parser, ubyte[] data, bool finish)
436 	{
437 		version(CollieDebugMode) debug logDebug("on boday, length : ", data.length);
438 		_callback.onBody(_transaction,data);
439 	}
440 
441 	bool responseBodyMustBeEmpty(ushort status) {
442 		return (status == 304 || status == 204 ||
443 			(100 <= status && status < 200));
444 	}
445 private:
446 	TransportDirection _transportDirection;
447 	CallBack _callback;
448 	HTTPTransaction _transaction;
449 	HTTPMessage _message;
450 	ubyte[] _currtKey;
451 	ubyte[] _currtValue;
452 	HTTPParser _parser;
453 
454 	uint _maxHeaderSize;
455 	bool _finished;
456 private:
457 	bool _parserActive = false;
458 	bool _pendingEOF = false;
459 	bool _parserPaused = false;
460 	bool _parserError = false;
461 	bool _requestPending = false;
462 	bool _responsePending = false;
463 	bool _egressChunked = false;
464 	bool _inChunk = false;
465 	bool _lastChunkWritten = false;
466 	bool _keepalive = false;
467 	bool _disableKeepalivePending = false;
468 	bool _connectRequest = false;
469 	bool _headRequest = false;
470 	bool _expectNoResponseBody = false;
471 	bool _mayChunkEgress = false;
472 	bool _is1xxResponse = false;
473 	bool _inRecvLastChunk = false;
474 	bool _ingressUpgrade = false;
475 	bool _ingressUpgradeComplete = false;
476 	bool _egressUpgrade = false;
477 	bool _nativeUpgrade = false;
478 	bool _headersComplete = false;
479 }
480